1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: JmsMessageConsumer.java,v 1.4 2007/01/24 12:00:28 tanderson Exp $
44 */
45 package org.exolab.jms.client;
46
47 import javax.jms.Destination;
48 import javax.jms.JMSException;
49 import javax.jms.Message;
50 import javax.jms.MessageConsumer;
51 import javax.jms.MessageListener;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56 import org.exolab.jms.message.MessageImpl;
57
58 import java.rmi.RemoteException;
59
60
61 /***
62 * Client implementation of the <code>javax.jms.MessageConsumer</code>
63 * interface.
64 *
65 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
66 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67 * @version $Revision: 1.4 $ $Date: 2007/01/24 12:00:28 $
68 */
69 class JmsMessageConsumer
70 implements JmsMessageListener, MessageConsumer {
71
72 /***
73 * The session which created this.
74 */
75 private JmsSession _session = null;
76
77 /***
78 * The consumer's identity, allocated by the server.
79 */
80 private final long _consumerId;
81
82 /***
83 * The destination to receive messages from.
84 */
85 private final Destination _destination;
86
87 /***
88 * A message listener may be assigned to this session, for asynchronous
89 * message delivery.
90 */
91 private MessageListener _listener = null;
92
93 /***
94 * The message selector, for filtering messages. May be <code>null</code>.
95 */
96 private String _selector = null;
97
98 /***
99 * Indicates if the session is closed.
100 */
101 private volatile boolean _closed = false;
102
103 /***
104 * The logger.
105 */
106 private static final Log _log =
107 LogFactory.getLog(JmsMessageConsumer.class);
108
109
110 /***
111 * Construct a new <code>JmsMessageProducer</code>.
112 *
113 * @param session the session responsible for the consumer
114 * @param consumerId the identity of this consumer
115 * @param destination the destination to receive messages from
116 * @param selector the message selector. May be <code>null</code
117 */
118 public JmsMessageConsumer(JmsSession session, long consumerId,
119 Destination destination, String selector) {
120 if (session == null) {
121 throw new IllegalArgumentException("Argument 'session' is null");
122 }
123 if (destination == null) {
124 throw new IllegalArgumentException(
125 "Argument 'destination' is null");
126 }
127 _session = session;
128 _consumerId = consumerId;
129 _destination = destination;
130 _selector = selector;
131 }
132
133 /***
134 * Return the message consumer's message selector expression.
135 *
136 * @return the selector expression, or <code>null</code> if one isn't set
137 */
138 public String getMessageSelector() {
139 return _selector;
140 }
141
142 /***
143 * Return the consumer's listener.
144 *
145 * @return the listener for the consumer, or <code>null</code> if there
146 * isn't one set
147 */
148 public MessageListener getMessageListener() {
149 return _listener;
150 }
151
152 /***
153 * Set the consumer's listener.
154 *
155 * @param listener the message listener, or <code>null</code> to deregister
156 * an existing listener
157 * @throws JMSException if the listener cannot be set
158 */
159 public void setMessageListener(MessageListener listener)
160 throws JMSException {
161
162
163 if (listener != null) {
164 if (_listener == null) {
165
166 _listener = listener;
167 _session.setMessageListener(this);
168 } else {
169
170
171 _listener = listener;
172 }
173 } else {
174 if (_listener != null) {
175 _session.removeMessageListener(this);
176 _listener = listener;
177 }
178 }
179 }
180
181 /***
182 * Receive the next message produced for this consumer. This call blocks
183 * indefinitely until a message is produced or until this message consumer
184 * is closed.
185 *
186 * @return the next message produced for this consumer, or <code>null</code>
187 * if this consumer is concurrently closed
188 * @throws JMSException if the next message can't be received
189 */
190 public Message receive() throws JMSException {
191 return receive(0);
192 }
193
194 /***
195 * Receive the next message that arrives within the specified timeout
196 * interval. This call blocks until a message arrives, the timeout expires,
197 * or this message consumer is closed. A timeout of zero never expires and
198 * the call blocks indefinitely.
199 *
200 * @param timeout the timeout interval, in milliseconds
201 * @return the next message produced for this consumer, or <code>null</code>
202 * if the timeout expires or the consumer concurrently closed
203 * @throws JMSException if the next message can't be received
204 */
205 public Message receive(long timeout) throws JMSException {
206 checkReceive();
207 return _session.receive(_consumerId, timeout);
208 }
209
210 /***
211 * Receive the next message if one is immediately available.
212 *
213 * @return the next message produced for this consumer, or <code>null</code>
214 * if one is not available
215 * @throws JMSException if the next message can't be received
216 */
217 public Message receiveNoWait() throws JMSException {
218 checkReceive();
219 return _session.receiveNoWait(_consumerId);
220 }
221
222 /***
223 * Close the consumer. This call blocks until a receive or message listener
224 * in progress has completed. A blocked consumer receive call returns
225 * <code>null</code> when this consumer is closed.
226 *
227 * @throws JMSException if this consumer can't be closed
228 */
229 public synchronized void close() throws JMSException {
230 if (!_closed) {
231 try {
232 _closed = true;
233 _session.removeConsumer(this);
234
235
236 notifyAll();
237 } finally {
238 _listener = null;
239 _session = null;
240 _selector = null;
241 }
242 }
243 }
244
245 /***
246 * Deliver a message.
247 *
248 * @param message the message to deliver
249 * @return <code>true</code> if the message was delivered; otherwise
250 * <code>false</code>.
251 */
252 public boolean onMessage(MessageImpl message) {
253 boolean delivered = false;
254 try {
255 if (_listener != null) {
256 _listener.onMessage(message);
257 delivered = true;
258 } else {
259 _log.error("NessageListener no longer registered");
260 }
261 } catch (Throwable exception) {
262 _log.error("MessageListener threw exception", exception);
263 }
264 return delivered;
265 }
266
267 /***
268 * Informs the session that there is a message available for a synchronous
269 * consumer.
270 */
271 public void onMessageAvailable() throws RemoteException {
272
273 }
274
275 /***
276 * Returns the destination to receive messages from.
277 *
278 * @return the destination to receive messages from
279 */
280 protected Destination getDestination() {
281 return _destination;
282 }
283
284 /***
285 * Returns the identity of this consumer.
286 *
287 * @return the identity of this consumer
288 */
289 protected long getConsumerId() {
290 return _consumerId;
291 }
292
293 /***
294 * Returns the session that created this consumer.
295 *
296 * @return the session that created this consumer
297 */
298 protected JmsSession getSession() {
299 return _session;
300 }
301
302 /***
303 * Determines if the consumer can perform receives.
304 *
305 * @throws JMSException if the consumer can't perform a receive
306 */
307 private void checkReceive() throws JMSException {
308 if (_listener != null) {
309
310 throw new JMSException("Can't receive when listener defined");
311 }
312
313 if (_closed) {
314 throw new JMSException("Can't receive when session closed");
315 }
316 }
317
318 }